import org.apache.flink.table.functions.AggregateFunction;

// function that takes (value BIGINT, weight INT), stores intermediate results in a structured
// type of WeightedAvgAccumulator, and returns the weighted average as BIGINT
public class MyUdagg extends AggregateFunction<Long, WeightedAvgAccumulator>
{

    @Override
    public WeightedAvgAccumulator createAccumulator()
    {
        return new WeightedAvgAccumulator();
    }

    @Override
    public Long getValue(WeightedAvgAccumulator acc)
    {
        if (acc.sum == 0) {
            System.out.println("acc.sum="+acc.sum);
            return null;
        } else {
//            return acc.sum / acc.count;
            return acc.sum;
        }
    }

    public void accumulate(WeightedAvgAccumulator acc, Integer iValue)
    {
//        acc.sum += iValue * iWeight;
//        acc.count += iWeight;
        acc.sum+=iValue;
    }

    public void retract(WeightedAvgAccumulator acc, Integer iValue)
    {
        acc.sum -= iValue ;
//        acc.count -= iWeight;
    }

    public void merge(WeightedAvgAccumulator acc, Iterable<WeightedAvgAccumulator> it)
    {
        for (WeightedAvgAccumulator a : it) {
//            acc.count += a.count;
            acc.sum += a.sum;
        }
    }

    public void resetAccumulator(WeightedAvgAccumulator acc) {
//        acc.count = 0;
        acc.sum = 0L;
    }
}